#!/usr/bin/perl
use strict;
use warnings;

# Patch 'Socket' module - on at least one of my devel systems, when using Net::Server::* as a 'net_server',
# the following error is output right when the connection starts and the process dies:
#	Bad arg length for Socket::unpack_sockaddr_in, length is 28, should be 16 at /usr/lib/perl5/5.8.8/i386-linux-thread-multi/Socket.pm line 370.
# By wrapping unpack_sockaddr_in(), I can trap the error and continue on.
# The code below in sockaddr_in() is a direct copy-paste from Socket.pm, with only the the eval{} and die() calls added.
package Socket;
{
	sub sockaddr_in 
	{
		if (@_ == 6 && !wantarray) { # perl5.001m compat; use this && die
			my($af, $port, @quad) = @_;
			warnings::warn "6-ARG sockaddr_in call is deprecated"
			if warnings::enabled();
			pack_sockaddr_in($port, inet_aton(join('.', @quad)));
		} elsif (wantarray) {
			croak "usage:   (port,iaddr) = sockaddr_in(sin_sv)" unless @_ == 1;
			eval { unpack_sockaddr_in(@_); };
			die $@ if $@ && $@ !~ /Bad arg length for Socket::unpack_sockaddr_in/;
		} else {
			croak "usage:   sin_sv = sockaddr_in(port,iaddr))" unless @_ == 2;
			pack_sockaddr_in(@_);
		}
	}
};

package HashNet::StorageEngine::PeerServer;
{
	use HTTP::Server::Simple::CGI;
	use base qw(HTTP::Server::Simple::CGI);
	
	use HashNet::StorageEngine;

	use Storable qw/lock_nstore lock_retrieve/;
	use LWP::Simple qw/get getstore/;
	use Data::Dumper;
	use URI::Escape;
	use URI; # to parse URLs given to is_this_peer
	use Socket; # for gethostbyname() in 3_peer
	use JSON::PP qw/encode_json decode_json/;
	use AnyEvent::Watchdog::Util;
		
	sub peer_port { 8031 }
	#sub peer_url  { 'http://localhost:' . peer_port() . '/db' }
	
	
	my @IP_LIST_CACHE;
	sub my_ip_list
	{
		return @IP_LIST_CACHE if @IP_LIST_CACHE;
	
		# Based on code from http://www.perlmonks.org/?node_id=53660
		my $interface;
		my %ifs;
			
		foreach ( qx{ (LC_ALL=C /sbin/ifconfig -a 2>&1) } ) 
		{
			$interface = $1 if /^(\S+?):?\s/;
			next unless defined $interface;
			$ifs{$interface}->{state} = uc($1) if /\b(up|down)\b/i;
			# NOTE Yes, I know - need to find a way to make compat with IPv6
			$ifs{$interface}->{ip}    = $1     if /inet\D+(\d+\.\d+\.\d+\.\d+)/i;
		}
		
		@IP_LIST_CACHE = ();
		foreach my $if (keys %ifs)
		{
			next if $ifs{$if}->{state} ne 'UP';
			my $ip = $ifs{$if}->{ip} || '';
			
			push @IP_LIST_CACHE, $ip if $ip;
		}
		
		return grep { $_ ne '127.0.0.1' } @IP_LIST_CACHE;
	}
	
	
	sub is_this_peer
	{
		my $class = shift;
		my $url = shift;
		
		my $uri  = URI->new($url)->canonical;
		my $host = $uri->host;
		my $url_ip =
			# NOTE Yes, I know - this regex doesn't define a 'valid' IPv4 address, and ignores IPv6 completely...
			$host =~ /^(\d+\.\d+\.\d+\.\d+)$/ ? $host :
			inet_ntoa(gethostbyname($host));
		
		#print STDERR "[DEBUG] is_this_peer: url: $url\n";
		#print STDERR "[DEBUG] is_this_peer: host:$host, url_ip:$url_ip\n";
		
		my @ip_list = my_ip_list();
		foreach my $ip (@ip_list)
		{
			return 1 if $ip eq $url_ip;
		}
		
		#print STDERR "[DEBUG] is_this_peer: $url is not this peer.\n";
		return 0;
	}

	my HashNet::StorageEngine::PeerServer $ActivePeerServer = undef;
	sub active_server { $ActivePeerServer }
	
	sub net_server { 'Net::Server::PreFork' }
	
	sub reg_peer
	{
		my $self = shift;
		my $peer = shift;
		
		my $url = $peer->url . '/reg_peer';

		if($peer->host_down)
		{
			print STDERR "[TRACE] PeerServer: reg_peer(): Not registering with peer at $url, host marked as down\n";
			return;
		}
		
		if($self->is_this_peer($url))
		{
			print STDERR "[TRACE] PeerServer: reg_peer(): Not registering with peer at $url, same as this host\n";
			return;
		}

		my @discovery_urls = map { 'http://'.$_.':'.peer_port().'/db' } grep { $_ ne '127.0.0.1' } my_ip_list();
		
		my $payload = "peer_url=".uri_escape(join('|', @discovery_urls));
		#my $payload_encrypted = $payload; #HashNet::Cipher->cipher->encrypt($payload);
		
		# LWP::Simple
		my $final_url = $url . '?' . $payload;
		print STDERR "[TRACE] PeerServer: reg_peer(): Trying to register as a peer at url: $final_url\n";

		my $r = get($final_url);
		$r ||= '';
		if($r eq '')
		{
			print STDERR "[TRACE] PeerServer: reg_peer(): No valid response when trying to register with peer at $url, marking as host_down\n";
			$peer->{host_down} = 1;
		}
		elsif(!$r)
		{
			$peer->{pull_only} = 1;
			print STDERR "[TRACE] PeerServer: reg_peer(): Peer $url cannot reach me to push transactions, I must pull from it (flagging as pull-only.)\n";
		}
		else
		{
			$r =~ s/[\r\n]$//g;
			$peer->{known_as} = $r =~ /http:/ ? $r : 0;
			print STDERR "[TRACE] PeerServer: reg_peer(): Registered with $url", ($peer->{known_as} ? " as: $peer->{known_as}" : ""), "\n";
		}

		# Check against the peer to see if they have newer software than we have
		$self->update_software($peer);
	}

	sub update_software
	{
		my $self = shift;
		my $peer = shift;

		if($peer->host_down)
		{
			print STDERR "[TRACE] PeerServer: update_software(): Not checking version with peer at $peer->{url}, host marked as down\n";
			return;
		}
		
		if($self->is_this_peer($peer->url))
		{
			print STDERR "[TRACE] PeerServer: update_software(): Not checking version with peer at $peer->{url}, same as this host\n";
			return;
		}

		my $ver = $self->check_version($peer);
		if($ver > 0)
		{
			print STDERR "[TRACE] PeerServer: update_software(): Updated version '$ver' available, downloading from peer...\n";
			$self->download_upgrade($peer);
		}
		elsif($ver == 0)
		{
			print STDERR "[TRACE] PeerServer: update_software(): Checked peer software version, running same or newer version.\n";
		}
	}

	sub check_version
	{
		my $self = shift;
		my $peer = shift;

		if($peer->host_down)
		{
			print STDERR "[TRACE] PeerServer: check_version(): Not checking version with peer at $peer->{url}, host marked as down\n";
			return;
		}
		
		if($self->is_this_peer($peer->url))
		{
			print STDERR "[TRACE] PeerServer: check_version(): Not checking version with peer at $peer->{url}, same as this host\n";
			return;
		}

		my $ver = $HashNet::StorageEngine::VERSION;
		my $url = $peer->url . '/ver?upgrade_check=' . $ver;
		my $json = get($url);
		if(!$json)
		{
			# TODO Should we mark host as down?
			print STDERR "[TRACE] PeerServer: check_version(): No valid version data from $peer->{url}\n";
			return -1;
		}
		
		my $data = decode_json($json);

		if($data->{has_new} &&
		   $data->{has_bin})
		{
			return $data->{version};
		}

		return 0;
	}

	sub download_upgrade
	{
		my $self = shift;
		my $peer = shift;

		my $upgrade_url = $peer->url . '/bin_file';

		if($peer->host_down)
		{
			print STDERR "[TRACE] PeerServer: download_upgrade(): Not download upgrade from peer at $upgrade_url, host marked as down\n";
			return;
		}
		
		if($self->is_this_peer($peer->url))
		{
			print STDERR "[TRACE] PeerServer: download_upgrade(): Not download upgrade from peer at $upgrade_url, same as this host\n";
			return;
		}

		my $bin_file = $self->bin_file;
		if(!$bin_file)
		{
			print STDERR "[TRACE] PeerServer: download_upgrade(): Cannot download upgrade from $upgrade_url - no bin_file set.\n";
			return;
		}

		print STDERR "[INFO]  PeerServer: download_upgrade(): Downloading update from $upgrade_url to $bin_file\n";

		getstore($upgrade_url, $bin_file);

		print STDERR "[INFO]  PeerServer: download_upgrade(): Download finished.\n";

		$self->request_restart;
	}
	
	sub request_restart
	{

		# attempt restart
		print STDERR "[INFO]  PeerServer: request_restart(): Restarting server\n";
		
		my $peer_port = peer_port();
# 		print "#######################\n";
# 		system("lsof -i :$peer_port");
# 		print "#######################\n";
# 		print "$0\n";
# 		print "#######################\n";
		 
		foreach ( qx{ lsof -i :$peer_port | grep -P '(perl|$0)' } )
		{
			my ($pid) = (split /\s+/)[1];
			next if $pid eq 'PID' || $pid eq $$;  # first line of lsof has the header PID, etc
			
			# Grab the process description just for debugging/info purposes
			my @lines = split /\n/, `ps $pid`;
			shift @lines;
			
			if(@lines)
			{
				print STDERR "[INFO]  PeerServer: request_restart(): Killing child: $pid\n";
				print STDERR "[INFO]              $lines[0]\n";
				
				# Actually kill the process
				kill 9, $pid;
			}
		}

		print STDERR "[INFO]  PeerServer: request_restart(): All children killed, killing self: $$\n";
		
		if(fork)
		{
			# Kill the parent in a fork after one second
			# so the child has time to execute the system() command
			sleep 1;
			kill $$;
		}
		else
		{
			# Close the app output so init messages from restarting (below) don't leak to any possible active HTTP requests
			select(STDOUT);
			close(STDOUT);
			
# 			#print STDERR "\$^X: $^X, \$0: $0, \@ARGV: @ARGV [END]\n";
# 			if($ENV{SCRIPT_FILE})
# 			{
# 				# running as a buildpacked.pl-packed file
# 				system("$0 &");
# 			}
# 			else
# 			{
				# Running as a regular script
				system("$^X $0 @ARGV &");
#			}
			exit;
		}
	}

	sub new
	{
		my $class = shift;
		my $engine = shift;

		my $self = $class->SUPER::new(peer_port());

		$self->{engine} = $engine;
		
		my @peers = @{ $engine->peers };
		foreach my $peer (@peers)
		{
			# Try to register as a peer
			$self->reg_peer($peer);
		}

		$self->{bin_file} = '';
		
		$self->{tr_cache_file} = ".hashnet.peerserver.transactions.$$";
		
		$ActivePeerServer = $self;
		
# 		if(my $pid = fork)
# 		{
# 			print "[TRACE] PeerServer: Forked $pid, returning\n";
# 			$self->{server_pid} = $pid;
# 			return $self;
# 		}

		#$self->run;
		#exit(-1); # we're in a forked copy, exit
		return $self;
	}

	sub engine { shift->{engine} }

	sub bin_file
	{
		my $self = shift;
		$self->{bin_file} = shift if @_;
		$self->{bin_file} ||= '';
		#print STDERR "[DEBUG] PeerServer: bin_file(): $self->{bin_file}\n";
		return $self->{bin_file};
	}
	
	sub has_seen_tr
	{
		my $self = shift;
		my $tr = shift;
		
		my $tr_cache = {};
		$tr_cache = lock_retrieve($self->{tr_cache_file}) if -f $self->{tr_cache_file};
		 
		return 1 if $tr_cache->{$tr->uuid};
	}
	
	sub mark_tr_seen
	{
		my $self = shift;
		my $tr = shift;
		
		my $tr_cache = {};
		$tr_cache = lock_retrieve($self->{tr_cache_file}) if -f $self->{tr_cache_file};
		
		$tr_cache->{$tr->uuid} = 1;
		
		lock_nstore($tr_cache, $self->{tr_cache_file});
	}

	sub DESTROY
	{
		my $self = shift;
		kill 9, $self->{server_pid} if $self->{server_pid};
	}
	
	my %dispatch = (
		'tr_push'  => \&resp_tr_push,
		'get'      => \&resp_get,
		'put'      => \&resp_put,
		'ver'      => \&resp_ver,
		'reg_peer' => \&resp_reg_peer,
		'bin_file' => \&resp_bin_file,
		# ...
	);

	sub peeraddr
	{
		my $self = shift;
		$self->{peeraddr} = shift if @_;
		
		return $self->{peeraddr};
	}
	
	sub handle_request
	{
		my $self = shift;
		my $cgi  = shift;
		
		# from pms-core - havn't tried it in this context yet
		#my $hostinfo = gethostbyaddr($client->peeraddr);
		#$ENV{REMOTE_HOST} = ($hostinfo ? ( $hostinfo->name || $client->peerhost  ) : $client->peerhost);
	
		# If this isn't right, it's okay - it just means that resp_tr_push() will assume the
		# wrong (localhost) IP if the remote peer doesn't give a peer_url().
		# It's also used in resp_reg_peer() for the same thing - to guess the remote peer's peer url
		my $remote_sockaddr = getpeername( $self->stdio_handle );
		my ( $iport, $iaddr ) = $remote_sockaddr ? sockaddr_in($remote_sockaddr) : (undef,undef);
		my $peeraddr = $iaddr ? ( inet_ntoa($iaddr) || "127.0.0.1" ) : '127.0.0.1';
		$self->{peeraddr} = $peeraddr;

		my $path = $cgi->path_info();
		print STDERR "[TRACE] PeerServer: [", $self->peeraddr, "] $path\n";
		
		my @path_parts = split /\//, $path;
		shift @path_parts if !$path_parts[0];
		my $item = shift @path_parts;
		#print STDERR Dumper $item, \@path_parts;
		if($item ne 'db')
		{
			return reply_404($cgi, ': db');
		}
		
		if(!@path_parts)
		{
			return reply_404($cgi, ': Valid DB method not found');
		}

		my $method = shift @path_parts;
		my $handler = $dispatch{$method};

		$cgi->{path_parts} = \@path_parts;
		
		if (ref($handler) eq "CODE")
		{
			print "HTTP/1.0 200 OK\r\n";
			$handler->($self, $cgi);

		}
		else
		{
			reply_404($cgi);
		}
	}

	sub reply_404
	{
		my $cgi = shift;
		my $info = shift;
		$info ||= '';
		print "HTTP/1.0 404 Not found\r\n";
		print $cgi->header,
		      $cgi->start_html('HashNet Not found '.$info),
		      $cgi->h1('HashNet Not found '.$info),
		      $cgi->end_html;
	}

=item C<resp_tr_push>

Handles transactions pushed from peers using
C<HashNet::StorageEngine::Peer:push()>.

After processing that transaction, it executes a software version check 
against all peers to ensure that this server is running the newest version 
of software available.

=cut
	sub resp_tr_push
	{
		my $self = shift;
		my $cgi  = shift;   # CGI.pm object
		return if !ref $cgi;

		my $data = uri_unescape($cgi->param('data'));
		print STDERR "[TRACE] PeerServer: resp_tr_push(): data=$data\n";
		
		#my $tr = HashNet::StorageEngine::TransactionRecord->from_bytes($data);
		my $tr = HashNet::StorageEngine::TransactionRecord->from_json($data);

		# Prevent recusrive updates of this $tr
		if($self->has_seen_tr($tr))
		{
			print STDERR "[TRACE] PeerServer: resp_tr_push(): Already seen ", $tr->uuid, " - not processing\n";
		}
		else
		{
			# Flag it as seen - mark_tr_seen() will sync across all threads
			$self->mark_tr_seen($tr);
			
			# Get the ip to guess the peer_url if not provided
			my $peer_ip  = $self->peeraddr || '';
			$peer_ip = (my_ip_list())[0] if !$peer_ip || $peer_ip eq '127.0.0.1';
			
			# peer_url is used to tell _push_tr() to not send the same tr back to the peer that sent it to us
			my $peer_url = $cgi->param('peer_url') || 'http://' . $peer_ip . ':' . peer_port() . '/db' ;
			
			# If the tr is valid...
			print STDERR "[TRACE] PeerServer: resp_tr_push(): ", $tr->key, " => ", $tr->data, " (from $peer_url)\n";
			if(defined $tr->key)
			{
				my $eng = $self->engine;
			
				# We dont use eng->put() here because it constructs a new tr
				$eng->_put_local($tr->key, $tr->data);
				$eng->_push_tr($tr, $peer_url); # peer_url is the url of the peer to skip when using it out to peers
			}

			# Check our version of software against all peers to make sure we have the latest version
			my @peers = @{ $self->engine->peers };
			foreach my $peer (@peers)
			{
				# Check software version against each peer
				$self->update_software($peer);
			}
		}
		
		print   "Content-Type: text/plain\r\n\r\n",
			"Received $tr: ", $tr->key, " => ", $tr->data, "\n";
	}

	sub resp_get
	{
		my $self = shift;
		my $cgi  = shift;   # CGI.pm object
		return if !ref $cgi;

		my $key = $cgi->param('key') || '/'. join('/', @{$cgi->{path_parts} || []});
		my $value = $self->engine->get($key);
		$value ||= ''; # avoid warnings
		
		print STDERR "[TRACE] PeerServer: resp_get($key): $value\n";
		print "Content-Type: text/plain\r\n\r\n", $value, "\n";
	}
	
	sub resp_put
	{
		# TODO add some sort of authentication for putting
		
		my $self = shift;
		my $cgi  = shift;   # CGI.pm object
		return if !ref $cgi;

		my $key = $cgi->param('key') || '/'. join('/', @{$cgi->{path_parts} || []});
		my $value = uri_unescape($cgi->param('data'));

		$self->engine->put($key, $value);

		print STDERR "[TRACE] PeerServer: resp_put($key): $value\n";
		print "Content-Type: text/plain\r\n\r\n", $value, "\n";
	}
	
	sub resp_ver
	{
		my $self = shift;
		my $cgi  = shift;   # CGI.pm object
		return if !ref $cgi;

		my $ver = $HashNet::StorageEngine::VERSION;

		my $upgrade_check = $cgi->param('upgrade_check');
		if(defined $upgrade_check)
		{
			$upgrade_check += 0;
			$ver += 0;
			
			my $has_new = $ver > $upgrade_check ? 1:0;
			my $has_bin = $self->bin_file       ? 1:0;
			my $json = encode_json({has_new => $has_new, has_bin => $has_bin, version => $ver, ver_string => "HashNet StorageEngine Version $ver"});
			
			print STDERR "[TRACE] PeerServer: resp_ver(): $ver, remote ver: $upgrade_check, json:$json\n";
			print "Content-Type: text/plain\r\n\r\n";
			print $json;
			return;
		}
		
		print STDERR "[TRACE] PeerServer: resp_ver(): $ver\n";
		print "Content-Type: text/plain\r\n\r\n", "HashNet StorageEngine Version $ver\n";
		
		# Just here to test restart logic
		#$self->request_restart;
	}

	sub resp_bin_file
	{
		my $self = shift;
		my $cgi = shift;
		return if !ref $cgi;

		my $bin_file = $self->bin_file;
		if(!-f $bin_file || !open(F, "<$bin_file"))
		{
			print "Content-Type: text/plain\r\n\r\nUnable to read bin_file or no bin_file defined\n";
			return;
		}
		
		print "Content-Type: application/octet-stream\r\n\r\n";
		print $_ while $_ = <F>;
		close(F);
	}
	
	sub resp_reg_peer
	{
		my $self = shift;
		my $cgi  = shift;   # CGI.pm object
		return if !ref $cgi;

		my $peer_url = $cgi->param('peer_url') || 'http://' . $self->peeraddr . ':' . peer_port() . '/db' ;
		if($peer_url =~ /\|/)
		{
			my @possible = split /\|/, $peer_url;
			
			my $final_url = 0;
			foreach my $possible_url (@possible)
			{
				my $result = $self->engine->add_peer($possible_url);
				
				# >0 means it's added
				# <0 means it's already in the list
				#  0 means we can't reach the host, not a good host
				if($result != 0)
				{
					# We still want to set final_url even if its already in the list
					# so that the peer knows that URL we expect them to provide 
					# for tr_push requests, etc.
					$final_url = $possible_url;
					last;
				}
			}
			
			print STDERR "[DEBUG] PeerServer: resp_reg_peer(): Register options: $peer_url, final_url: $final_url\n";
			print "Content-Type: text/plain\r\n\r\n", $final_url, "\n";
			
			$peer_url = $final_url; # for update checks below
		}
		else
		{
			my $valid = $self->engine->add_peer($peer_url);
			
			# $valid:
			# >0 means it's added
			# <0 means it's already in the list
			#  0 means we can't reach the host, not a good host
			# See discussion on why we still give $peer_url if <0 above
				
			print STDERR "[TRACE] PeerServer: resp_reg_peer(): $peer_url, valid: $valid\n";
			print "Content-Type: text/plain\r\n\r\n", ($valid!=0 ? $peer_url : 0), "\n";
		}
		
		# TODO eventually convert the server to use an event loop so we can set a timer to trigger this code later
		
		# this won't work - since the reg_peer() call on the *peer* server will not complete until we finish our output
		#sleep 1; # give peer time to startup server
# 		my $peer = $self->engine->peer($peer_url);
# 		if($peer)
# 		{
# 			print STDERR "[TRACE] PeerServer: resp_reg_peer(): Checking software against remote peer at $peer_url\n";
# 			$self->update_software($peer);
# 		}
# 		else
# 		{
# 			print STDERR "[TRACE] PeerServer: resp_reg_peer(): Unable to check software against remote peer at $peer_url - couldn't find a Peer object in the engine's peer list\n";
# 		}
	}
};

